-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24479][SS] Added config for registering streamingQueryListeners #21504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Mind fixing the PR title to |
| private var lastTerminatedQuery: StreamingQuery = null | ||
|
|
||
| sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS) | ||
| .foreach { classNames => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames =>
...
}
merlintang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.sql.execution.streaming._ | ||
| import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryStartedEvent, QueryTerminatedEvent, _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import looks a bit odd {QueryStartedEvent, QueryTerminatedEvent, _}.
|
Test build #91510 has finished for PR 21504 at commit
|
|
@HyukjinKwon , thanks for reviewing. Addressed comments. |
|
Test build #91532 has finished for PR 21504 at commit
|
| Utils.loadExtensions(classOf[StreamingQueryListener], classNames, | ||
| sparkSession.sparkContext.conf).foreach(addListener) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two comments here:
- we need to log the registration information here
- we need to use try catch for this, it is possible that register fail. this would break the job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Addressed, please check.
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems fine.
| Utils.loadExtensions(classOf[StreamingQueryListener], classNames, | ||
| sparkSession.sparkContext.conf).foreach(listener => { | ||
| addListener(listener) | ||
| logInfo(s"Registered listener ${listener.getClass.getName}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do this at debug level ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either debug or info is fine for me, since it would add just couple of log lines only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since its only once and provides information to user I guess info is fine. Similar pattern here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2359
| } | ||
| } catch { | ||
| case e: Exception => | ||
| throw new SparkException(s"Exception when registering StreamingQueryListener", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s seems not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except comments from @HyukjinKwon.
|
Test build #91540 has finished for PR 21504 at commit
|
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Test build #91570 has finished for PR 21504 at commit
|
|
retest this please |
|
Test build #91577 has finished for PR 21504 at commit
|
|
retest this, please |
|
Test build #91585 has finished for PR 21504 at commit
|
|
Test failures were from kafka. retest this, please |
|
retest this please |
|
Test build #91645 has finished for PR 21504 at commit
|
| .toSequence | ||
| .createOptional | ||
|
|
||
| val STREAMING_QUERY_LISTENERS = buildStaticConf("spark.sql.streamingQueryListeners") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe -> spark.sql.streaming.streamingQueryListeners for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok makes sense. renamed.
|
@HyukjinKwon , addressed comments. Can you take it forward? |
|
Looks fine but let me leave this open for more days before merging it in case someone has some comments on it. |
|
Test build #91715 has finished for PR 21504 at commit
|
|
retest this please |
|
Test build #91737 has finished for PR 21504 at commit
|
|
|
||
|
|
||
| override protected def sparkConf: SparkConf = | ||
| super.sparkConf.set("spark.sql.streamingQueryListeners", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems a mistake here.
Change-Id: I030ef8a27ec2962170981f2d1b4ed16165e2aef9
|
(Don't block on me - I won't have time to review in detail unless needed, but broadly the PR looks fine) |
|
Test build #91741 has finished for PR 21504 at commit
|
|
retest this please |
|
Test build #91761 has finished for PR 21504 at commit
|
|
Merged to master. |
What changes were proposed in this pull request?
Currently a "StreamingQueryListener" can only be registered programatically. We could have a new config "spark.sql.streamingQueryListeners" similar to "spark.sql.queryExecutionListeners" and "spark.extraListeners" for users to register custom streaming listeners.
How was this patch tested?
New unit test and running example programs.
Please review http://spark.apache.org/contributing.html before opening a pull request.